package com.nx.platform.es.biz.esspider.sink;

import com.google.common.base.Preconditions;
import com.google.common.collect.Table;
import com.google.common.collect.Tables;
import com.google.common.primitives.Longs;
import com.nx.platform.es.common.utils.ESHelper;
import com.nx.platform.es.common.utils.MoreSplitters;
import com.nx.platform.es.biz.esspider.entity.Item;
import com.nx.platform.es.common.utils.MoreFunctions;
import com.nx.platform.es.common.utils.MoreMaps;
import com.nx.platform.es.service.ESClientManager;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.jetbrains.annotations.NotNull;

import java.math.BigDecimal;
import java.util.*;
import java.util.stream.Collectors;

/**
 * ES 写入函数
 * @author
 * @date 2018/01/25
 */
@SinkDefine(SinkType.ES)
public class ESSink extends AbstractESSink implements ESHelper {

    private static final String FIELDS = "fields";
    private static final String INDEX = "index";
    private static final String TYPE = "type";
    private static final String OPEN = "open";

    private Set<Table.Cell<String, Object, String>> fields;
    private String index;
    private String type;
    private boolean open;

    public ESSink(String identity, ESClientManager esManager) {
        super(identity, esManager);
    }

    @Override
    public void init(Map<?, ?> settings) throws Exception {

        open = MoreMaps.getBooleanValue(settings, OPEN, true);

        index = MoreMaps.getString(settings, INDEX);
        Preconditions.checkArgument(StringUtils.isNotBlank(index), "index null or empty");

        type = MoreMaps.getString(settings, TYPE);
        Preconditions.checkArgument(StringUtils.isNotBlank(type), "type null or empty");

        String[] fieldsArr = MoreMaps.getStringArray(settings, FIELDS, MoreSplitters.COMMA);
        Preconditions.checkArgument(fieldsArr != null && fieldsArr.length > 0, "fields null or empty");
        fields = new LinkedHashSet<>(fieldsArr.length);
        for (String field : fieldsArr) {
            String[] ff = MoreSplitters.COMMA.split(field, 2);
            if (ff.length > 1) {
                switch (ff[1]) {
                    case "{}":
                        fields.add(Tables.immutableCell(ff[0], new HashMap<>(0), ""));
                        break;
                    case "true":
                        fields.add(Tables.immutableCell(ff[0], true, "boolean"));
                        break;
                    case "false":
                        fields.add(Tables.immutableCell(ff[0], false, "boolean"));
                        break;
                    case "''":
                        fields.add(Tables.immutableCell(ff[0], "", ""));
                        break;
                    default:
                        if (ff[1].startsWith("[") && ff[1].endsWith("]")) {
                            List<Long> list = MoreFunctions.toLongList(ff[1].substring(1, ff[1].length() - 1), MoreSplitters.COMMA);
                            fields.add(Tables.immutableCell(ff[0], list, ""));
                        } else {
                            fields.add(Tables.immutableCell(ff[0], Longs.tryParse(ff[1]), ""));
                        }
                        break;
                }
            } else {
                fields.add(Tables.immutableCell(field, null, ""));
            }
        }

    }

    @Override
    public @NotNull Map<Long, Item> accept(Map<Long, Item> items) {
        if (!open) {
            return Collections.emptyMap();
        }
        //失败重试集合
        Map<Long, Item> failedItems = new HashMap<>();
        //
        Pair<RestHighLevelClient, String> client = getClient();
        BulkRequest bulk = new BulkRequest();
        Map<String, Item> indexId2Item = new HashMap<>(items.size());
        //
        for (Item item : items.values()) {
            // discart
            if (item.getOpType() == Item.OpType.DISCARD) {
                continue;
            }
            // index meta
            Map<String, Object> doc = item.getDoc();
            String id = String.valueOf(item.getId());
            if (MapUtils.isEmpty(doc)) {
                //数据库还没来得及同步的情况，需要重试
                failedItems.put(item.getId(), item);
            }//失败case，进入重试队列
            indexId2Item.put(id, item);
            // delete or upsert
            if (item.getOpType() == Item.OpType.DELETE) {
                bulk.add(newDeleteRequest(index, type, id, null));
            } else {
                Map<String, Object> document = new LinkedHashMap<>(fields.size());
                for (Table.Cell<String, Object, String> field : fields) {
                    String fieldName = field.getRowKey();
                    Object defaultValue = field.getColumnKey();
                    String fieldType = field.getValue();
                    Object value = doc.get(fieldName);
                    if (value instanceof Date) {
                        long time = ((Date) value).getTime();
                        document.put(fieldName, time);
                        continue;
                    } else if (value instanceof Number && "boolean".equals(fieldType)) {
                        boolean bool = !NumberUtils.LONG_ZERO.equals(MoreFunctions.objectToLong(value));
                        document.put(fieldName, bool);
                        continue;
                    } else if (value instanceof Collection) {
                        Collection<?> collection = ((Collection<?>) value).stream().filter(Objects::nonNull).collect(Collectors.toSet());
                        if (CollectionUtils.isNotEmpty(collection)) {
                            document.put(fieldName, collection);
                            continue;
                        }
                    } else if (value instanceof Map && MapUtils.isNotEmpty((Map<?, ?>) value)) {
                        document.put(fieldName, value);
                        continue;
                    } else if (value instanceof BigDecimal) {
                        Double decimal = ((BigDecimal) value).doubleValue();
                        document.put(fieldName, decimal);
                        continue;
                    } else if (value != null) {
                        document.put(fieldName, value);
                        continue;
                    }
                    // default value
                    if (defaultValue != null) {
                        document.put(fieldName, defaultValue);
                    }
                }
                item.getEffectDocs().put(client.getRight(), document);
                if (item.getOpType() == Item.OpType.UPDATE) {
                    bulk.add(newUpdateRequest(index, type, id, null, document));
                } else {
                    bulk.add(newUpsertRequest(index, type, id, null, document));
                }
            }
        }
        //
        if (bulk.requests().isEmpty()) {
            return Collections.emptyMap();
        }

        try {
            BulkResponse response = client.getLeft().bulk(bulk);
            for (BulkItemResponse itemResponse : response.getItems()) {
                Long id = Longs.tryParse(itemResponse.getId());
                Item item = indexId2Item.get(itemResponse.getId());
                if (itemResponse.getFailure() != null) {
                    if (itemResponse.getFailure().getStatus().getStatus() != RestStatus.NOT_FOUND.getStatus()) {
                        failedItems.put(id, item);
                    }
                    item.getResults().put(client.getRight(), String.valueOf(itemResponse.getFailure()));
                } else {
                    item.getResults().put(client.getRight(), convert(itemResponse.getResponse()));
                }
            }
            return failedItems;
        } catch (Exception e) {
            items.values().forEach(item -> item.getResults().put(client.getRight(), e.getMessage()));
            return items;
        }
    }
}
